Flink开发 | 您所在的位置:网站首页 › flink 时间窗口不固定 › Flink开发 |
Flink开发-事件时间窗口EventTimeWindows
1.Non-Keyed Tumbling Windows2.Keyed Tumbling Windows3.Keyed Session Windows
Event Time指的是数据流中每个元素或者每个事件自带的时间属性,一般是事件发生的时间。由于事件从发生到进入Flink时间算子之间有很多环节,一个较早发生的事件因为延迟可能较晚到达,因此使用Event Time意味着事件到达有可能是乱序的。 使用Event Time时,最理想的情况下,我们可以一直等待所有的事件到达后再进行时间窗口的处理。假设一个时间窗口内的所有数据都已经到达,基于Event Time的流处理会得到正确且一致的结果。无论我们是将同一个程序部署在不同的计算环境,还是在相同的环境下多次计算同一份数据,都能够得到同样的计算结果。我们根本不同担心乱序到达的问题。 但这只是理想情况,现实中无法实现,因为我们既不知道究竟要等多长时间才能确认所有事件都已经到达,更不可能无限地一直等待下去。在实际应用中,当涉及到对事件按照时间窗口进行统计时,Flink会将窗口内的事件缓存下来,直到接收到一个Watermark,Watermark假设不会有更晚数据的到达。Watermark意味着在一个时间窗口下,Flink会等待一个有限的时间,这在一定程度上降低了计算结果的绝对准确性,而且增加了系统的延迟。比起其他几种时间语义,使用Event Time的好处是某个事件的时间是确定的,这样能够保证计算结果在一定程度上的可预测性。 一个基于Event Time的Flink程序中必须定义: 每条数据的Event Time时间戳作为Event Tme,如何生成Watermark。我们可以使用数据自带的时间作为Event Time,也可以在数据到达Flink后人为给Event Time赋值。总之,使用Event Time的优势是结果的可预测性,缺点是缓存较大,增加了延迟,且调试和定位问题更复杂。 1.Non-Keyed Tumbling Windows public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //在老版本API中我们使用eventTime作为时间标准时,设置EventTime作为时间标准 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //5S窗口时间范围为[1609430400000,1609430405000) //当前分区中数据的携带最大EventTime - 乱序延迟时间 >= 窗口结束时间 就会触发该窗口 DataStreamSource socketStream = env.socketTextStream("localhost", 8888); //提取数据中的时间,将时间转成精确到毫秒的Long类型,并生成WaterMark,Watermark可以理解为允许数据的延迟时间。调用该方法前后不会对数据格式产生影响。 SingleOutputStreamOperator dataWithWaterMark = socketStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(0)) { @Override//element就是我们传入的数据 public long extractTimestamp(String element) { //提取数据中的时间 return Long.parseLong(element.split(",")[0]); } }); SingleOutputStreamOperator nums = dataWithWaterMark.map(new MapFunction() { @Override public Integer map(String s) throws Exception { return Integer.parseInt(s.split(",")[1]); } }); SingleOutputStreamOperator sum = nums.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))).sum(0); sum.print(); env.execute(""); }输入内容: C:\Users\zhibai>nc -lp 8888 1609430400000,1 1609430403000,2 1609430404400,1 1609430404999,3输出结果: 2> 7 2.Keyed Tumbling Windows当使用单并行度source作为数据源时,可以当从该数据源接收到的EventTime满足窗口触发条件时,会将下游的所有并行度窗口都触发。当使用多并行度Source作为数据源是,必须保证从Source的所有并行度都接收到了满足条件的EventTime窗口才会触发。 public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource socketStream = env.socketTextStream("localhost", 8888); SingleOutputStreamOperator map = socketStream.map(new MapFunction() { @Override public Tuple3 map(String s) throws Exception { String[] split = s.split(","); return Tuple3.of(Long.parseLong(split[0]), split[1], Integer.parseInt(split[2])); } }).setParallelism(2); SingleOutputStreamOperator timestampsAndWatermarks = map.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(0)) { @Override public long extractTimestamp(Tuple3 element) { return element.f0; } }); KeyedStream keyedStream = timestampsAndWatermarks.project(1, 2).keyBy(0); SingleOutputStreamOperator sum = keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5))).sum(1); sum.print(); env.execute(""); }输入内容: C:\Users\zhibai>nc -lp 8888 1000,saprk,1 1000,flink,1 1000,flink,1 3300,saprk,1 4444,hadoop,1 4999,flink,1 4999,hadoop,1输出结果: 2> (saprk,2) 8> (hadoop,2) 7> (flink,3) 3.Keyed Session WindowsSessionWindos触发也会依据数据的EventTime来触发,程序会依据数据流入的最新的时间来当作当前时间,以此为依据来触发以满足的时间间隔的数据分区。其他与上文提到的TumblingWindows相同。 public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource socketStream = env.socketTextStream("localhost", 8888); //调用该方法前后不会对数据格式产生影响 SingleOutputStreamOperator dataWithWaterMark = socketStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(0)) { @Override//element就是我们传入的数据 public long extractTimestamp(String element) { //提取数据中的时间 return Long.parseLong(element.split(",")[0]); } }); SingleOutputStreamOperator wordAndOne = dataWithWaterMark.map(new MapFunction() { @Override public Tuple2 map(String s) throws Exception { String[] fields = s.split(","); return Tuple2.of(fields[0], Integer.parseInt(fields[1])); } }); KeyedStream keyedStream = wordAndOne.keyBy(new KeySelector() { @Override public String getKey(Tuple2 s) throws Exception { return s.f0; } }); //EventTime WindowedStream eventTimewindow = keyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5))); eventTimewindow.sum(1).print(); env.execute(""); }输入内容: C:\Users\zhibai>nc -lp 8888 1000,1 1000,3 2345,1 2345,6 3000,1 8000,1输出结果: 6> (1000,4) 6> (2345,7) 6> (3000,1) |
CopyRight 2018-2019 实验室设备网 版权所有 |